Skip to content

Conversation

@tobias-wilfert
Copy link
Member

@tobias-wilfert tobias-wilfert commented Nov 4, 2025

This ports the client_report processing logic over to the new processor. In doing so it refactors the logic to better align with the structure that can be found in the other processors. The logic should be however remain the same except for one breaking change. To simplify the logic, reports are now never forwarded but rather always processed or droped. Previously if (!config.emit_outcomes().any() || !config.emit_client_outcomes()) && !config.processing_enabled() we would forward the reports instead of processing them.

Fixes: INGEST-514

@tobias-wilfert tobias-wilfert self-assigned this Nov 7, 2025
Comment on lines 48 to 53
if (!config.emit_outcomes().any() || !config.emit_client_outcomes())
&& config.processing_enabled()
{
// if a processing relay has client outcomes disabled we drop them without processing.
return;
}
Copy link
Member Author

@tobias-wilfert tobias-wilfert Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be:

// if client outcomes are disabled we leave the client reports unprocessed
// and pass them on.
if !config.emit_outcomes().any() || !config.emit_client_outcomes() {
    // if a processing relay has client outcomes disabled we drop them.
    if config.processing_enabled() {
        managed_envelope.retain_items(|item| match item.ty() {
            ItemType::ClientReport => ItemAction::DropSilently,
            _ => ItemAction::Keep,
        });
    }
    return;
}

Not sure however we want to still drop them when processing is enabled or if we just want to get rid of this all together.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a redundant check, if outcomes are disabled, the aggregator should drop them alltogether 🤔

@jjbayer wdyt, remove this either now or in a follow up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, emit_client_outcomes and emit_outcomes are different config flags so it's not entirely redundant. But I would be fine with making a breaking change and remove emit_client_outcomes from the config altogether.


use super::*;

// FIXME: Ask if moving the tests over is worth the changes to ProcessEnvelopeGrouped and Submit or if they should just stay here (hard to find).
Copy link
Member Author

@tobias-wilfert tobias-wilfert Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The remaining 2 client report test are not yet moved since doing so will require us to make ProcessEnvelopeGrouped and Submit public not sure if we want to do that to have the tests in a "better spot".

/// At the moment client reports are primarily used to transfer outcomes from
/// client SDKs. The outcomes are removed here and sent directly to the outcomes
/// system.
pub fn process_client_reports(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is taken almost 1-1 from the old processing so if we think this is better broken up or rewritten to make it more elegant can still do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do that, left an explaining comment on why here: #5338 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, rewrote the logic now to split it up ruffly into the individual parts that you mentioned. Not sure if the code is too cute now though.

@tobias-wilfert tobias-wilfert changed the title ref(client-reports): Move Client Reports to the new processing WIP ref(client-reports): Move Client Reports to the new processing Nov 7, 2025
@linear
Copy link

linear bot commented Nov 7, 2025

@tobias-wilfert tobias-wilfert marked this pull request as ready for review November 7, 2025 10:27
@tobias-wilfert tobias-wilfert requested a review from a team as a code owner November 7, 2025 10:27
Comment on lines 48 to 53
if (!config.emit_outcomes().any() || !config.emit_client_outcomes())
&& config.processing_enabled()
{
// if a processing relay has client outcomes disabled we drop them without processing.
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a redundant check, if outcomes are disabled, the aggregator should drop them alltogether 🤔

@jjbayer wdyt, remove this either now or in a follow up?

&& config.processing_enabled()
{
// if a processing relay has client outcomes disabled we drop them without processing.
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return an error here, this only works because we do not log any outcomes for client reports, but dropping data should be explicit and go through Rejected.

Comment on lines 91 to 92
relay_log::trace!("ignored client outcome with an overlong reason");
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this bypasses the accounting logic of Managed, it just happens to work because there are no outcomes for client reports, but the processing code should work independent of this.

With outcomes for client reports and in a debug build this would panic due to accounting errors.

If you follow the implementation of e.g. the logs processor, you can do the processing in multiple phases:

  • managed.map() -> here you expand the client reports
  • managed.retain() -> here you validate/normalize the client reports
  • managed.accept() -> here you send the client reports to the aggregator

For readability reasons you can also split these 3 phases into 3 different functions.

If you want we can pair next week on making these changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, if we make the effort of porting the code we might as well use the new bookkeeping mechanism.

Comment on lines 48 to 53
if (!config.emit_outcomes().any() || !config.emit_client_outcomes())
&& config.processing_enabled()
{
// if a processing relay has client outcomes disabled we drop them without processing.
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, emit_client_outcomes and emit_outcomes are different config flags so it's not entirely redundant. But I would be fine with making a breaking change and remove emit_client_outcomes from the config altogether.

Comment on lines 91 to 92
relay_log::trace!("ignored client outcome with an overlong reason");
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, if we make the effort of porting the code we might as well use the new bookkeeping mechanism.

Comment on lines +190 to +200
/// Emits the validated client report outcomes to the outcome aggregator.
pub fn emit(
client_reports: Managed<ValidatedClientReports>,
outcome_aggregator: &Addr<TrackOutcome>,
) {
client_reports.accept(|client_reports| {
for outcome in client_reports.outcomes {
outcome_aggregator.send(outcome)
}
})
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we think this is worth its own function else can also move it into validate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge these if it makes sense, the additional step of converting to TrackedOutcome seems a bit weird.

What we can do is merge these two parts together into just an emit, there you can split out individual reports/outcomes using managed.split() then rejecting individual incorrect items with a try_accept.

Roughly:

for outcome in client_reports.split(|client_reports| client_reports.outcomes) {
    outcome.try_accept(|...| {
    })
}

A lot of this is often just trying it out, see how it feels and reads and if whether you think this overall improves the situation if you compare the two options.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, I would even go further and split up validate, so you get

let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);
process::emit(tracked_outcomes)

IMO these are all high-level building blocks of the processing pipeline.

@tobias-wilfert tobias-wilfert changed the title WIP ref(client-reports): Move Client Reports to the new processing ref(client-reports): Move Client Reports to the new processing Nov 17, 2025
let mut output_events = BTreeMap::new();

for item in client_reports.client_reports {
match ClientReport::parse(&item.payload()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider factoring this match block out in a separate function it reduces nesting overall and in general just increases readability.

You have the expansion function which deals with the container and many items, then you have a dedicated parsing function which deals with an individual item.

If you were to write unittests, that would also be much easier to unittest individually. Tests are often a good way to figure out code could be organized better/differently.

Comment on lines +190 to +200
/// Emits the validated client report outcomes to the outcome aggregator.
pub fn emit(
client_reports: Managed<ValidatedClientReports>,
outcome_aggregator: &Addr<TrackOutcome>,
) {
client_reports.accept(|client_reports| {
for outcome in client_reports.outcomes {
outcome_aggregator.send(outcome)
}
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge these if it makes sense, the additional step of converting to TrackedOutcome seems a bit weird.

What we can do is merge these two parts together into just an emit, there you can split out individual reports/outcomes using managed.split() then rejecting individual incorrect items with a try_accept.

Roughly:

for outcome in client_reports.split(|client_reports| client_reports.outcomes) {
    outcome.try_accept(|...| {
    })
}

A lot of this is often just trying it out, see how it feels and reads and if whether you think this overall improves the situation if you compare the two options.

}

ValidatedClientReports { outcomes }
}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Similar as what David mentioned above, I would move the body of the for loop to a separate function.
  2. I think readability would benefit from splitting the conversion to TrackOutcome into a top-level process function, so you get something like
let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);

Comment on lines +190 to +200
/// Emits the validated client report outcomes to the outcome aggregator.
pub fn emit(
client_reports: Managed<ValidatedClientReports>,
outcome_aggregator: &Addr<TrackOutcome>,
) {
client_reports.accept(|client_reports| {
for outcome in client_reports.outcomes {
outcome_aggregator.send(outcome)
}
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, I would even go further and split up validate, so you get

let client_reports = process::validate(...);
let tracked_outcomes = process::convert(_);
process::emit(tracked_outcomes)

IMO these are all high-level building blocks of the processing pipeline.

Comment on lines +80 to +84
if !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() {
// if a processing relay has client outcomes disabled we drop them without processing.
return Err(client_reports.reject_err(Error::OutcomesDisabled));
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() is logically unreachable, preventing OutcomesDisabled error.
Severity: HIGH | Confidence: High

🔍 Detailed Analysis

The condition !ctx.config.emit_outcomes().any() && ctx.config.processing_enabled() at relay-server/src/processing/client_reports/mod.rs:80-82 is logically unreachable. When ctx.config.processing_enabled() is true, emit_outcomes().any() is always true, making the first part false. When ctx.config.processing_enabled() is false, the second part is false. Consequently, the OutcomesDisabled error path is never taken, causing client reports to always be processed, ignoring the emit_outcomes configuration. This is a functional regression.

💡 Suggested Fix

Correct the condition at relay-server/src/processing/client_reports/mod.rs:80-82 to reflect the intended logic, likely by negating ctx.config.processing_enabled() or reverting to the original condition (!config.emit_outcomes().any() || !config.emit_client_outcomes()) && !config.processing_enabled().

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: relay-server/src/processing/client_reports/mod.rs#L80-L84

Potential issue: The condition `!ctx.config.emit_outcomes().any() &&
ctx.config.processing_enabled()` at
`relay-server/src/processing/client_reports/mod.rs:80-82` is logically unreachable. When
`ctx.config.processing_enabled()` is true, `emit_outcomes().any()` is always true,
making the first part false. When `ctx.config.processing_enabled()` is false, the second
part is false. Consequently, the `OutcomesDisabled` error path is never taken, causing
client reports to always be processed, ignoring the `emit_outcomes` configuration. This
is a functional regression.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference ID: 3173889

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants